/**
 * @project: parallel-task
 * @package: com.ngplat.paralleltask.forkjoin
 * @filename: ForkJoinProcessor.java
 *
 * Copyright (c) 2018 eSunny Info. Tech Ltd. All rights reserved.
 * 
 */
package com.ngplat.paralleltask.forkjoin;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ngplat.paralleltask.constants.TaskConsts;
import com.ngplat.paralleltask.exception.TaskExecutionException;
import com.ngplat.paralleltask.task.TaskContext;
import com.ngplat.paralleltask.task.TaskPoolManager;
import com.ngplat.paralleltask.task.TaskProcessor;
import com.ngplat.paralleltask.utils.ListUtils;

/**
 * @typename: ForkJoinProcessor
 * @brief: ForkJoin处理器
 * @author: KI ZCQ
 * @date: 2018年6月1日 下午5:35:05
 * @version: 1.0.0
 * @since
 * 
 */
@SuppressWarnings("rawtypes")
public abstract class ForkJoinProcessor<T> extends TaskProcessor {

	// serialVersionUID
	private static final long serialVersionUID = 2586083836191570051L;

	private static Logger logger = LoggerFactory.getLogger(ForkJoinProcessor.class);

	// 默认子列表大小
	public final static int DEFAULT_SUB_LIST_SIZE = 50000;
	protected int subListSize = DEFAULT_SUB_LIST_SIZE;

	// 源数据列表
	protected List<T> srcList;

	/**
	 * 仅需要知道使用谁执行逻辑即可
	 */
	protected final Class<AbstractRecursiveWorker> clazz;

	/**
	 * 创建一个新的实例 ForkJoinProcessor.
	 */
	@SuppressWarnings("unchecked")
	public ForkJoinProcessor(Class<?> clazz) {
		// 判断是否是AbstractRecursiveWorker的子类
		if (AbstractRecursiveWorker.class.isAssignableFrom(clazz)) {
			this.clazz = (Class<AbstractRecursiveWorker>) clazz;
		} else {
			throw new IllegalArgumentException("The Argument is not AssignableFrom AbstractRecursiveWorker.");
		}
		
		// 初始化列表
		srcList = new ArrayList<>();
	}

	/**
	 * @brief 重写runWorker, fork->execute->join
	 * @see com.ngplat.paralleltask.task.TaskProcessor#runWorker(com.ngplat.paralleltask.task.TaskContext)
	 */
	@Override
	public void runWorker(TaskContext context) throws TaskExecutionException {
		// 大列表切分成小列表, 分任务执行
		List<List<T>> subLists = ListUtils.splitList(srcList, subListSize);
		// 判空处理
		if (subLists == null || subLists.size() == 0) {
			throw new TaskExecutionException(
					new NullPointerException(String.format("{%s}切分任务子列表不可为空, 请检查!", this.getClass().getName())));
		}

		// 子任务个数
		int subTaskSize = subLists.size();
		// 用于统计任务完成情况
		CountDownLatch countDownLatch = new CountDownLatch(subTaskSize);

		// 创建任务
		for (int idx = 0; idx < subTaskSize; idx++) {
			// 子任务需要处理的数据
			List<T> subList = subLists.get(idx);
			try {
				AbstractRecursiveWorker worker = newWorker(subList, countDownLatch);
				// 设置名称
				worker.setTaskName(this.getClass().getSimpleName() + TaskConsts.TASK_NAME_SEPARATOR
						+ clazz.getSimpleName() + TaskConsts.TASK_NAME_SEPARATOR + idx);
				// 提交线程池执行该任务, 不关心返回结果
				TaskPoolManager.DEFAULT.invoke(worker);
			} catch (Exception e) {
				logger.error("[ForkJoinProcessor:runWorker]执行异常, 异常信息: {}", e);
				throw new TaskExecutionException(e);
			}
		}

		// 等待子任务完成
		try {
			countDownLatch.await();
		} catch (InterruptedException e) {
			logger.error("[ForkJoinProcessor->countDownLatch.await()]await异常, 异常信息: {}", e);
			throw new TaskExecutionException(e);
		}
	}

	/**
	 * 
	 * @Description: 创建可执行任务
	 * @param list
	 * @param countDownLatch
	 *            用于通知任务完成
	 * @return
	 */
	@SuppressWarnings("unchecked")
	private AbstractRecursiveWorker<T> newWorker(List<T> list, CountDownLatch countDownLatch)
			throws NoSuchMethodException, SecurityException, InstantiationException, IllegalAccessException,
			IllegalArgumentException, InvocationTargetException {
		// 构造函数
		Constructor<AbstractRecursiveWorker> constructor = clazz.getConstructor(TaskContext.class, List.class, CountDownLatch.class);
		// 创建实例
		AbstractRecursiveWorker worker = constructor.newInstance(this.context, list, countDownLatch);
		// 实例对象
		return worker;
	}

}
